/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package com.aliyun.odps.mapred.lib;

import static com.aliyun.odps.mapred.utils.UTF8ByteArrayUtils.unescapeSeparator;

import java.io.UnsupportedEncodingException;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.Partitioner;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.lib.KeyFieldHelper.KeyDescription;

/**
 * Defines a way to partition keys based on certain key fields (also see
 * {@link KeyFieldBasedComparator}.
 * The key specification supported is of the form -k pos1[,pos2], where,
 * pos is of the form f[.c][opts], where f is the number
 * of the key field to use, and c is the number of the first character from
 * the beginning of the field. Fields and character posns are numbered
 * starting with 1; a character position of zero in pos2 indicates the
 * field's last character. If '.c' is omitted from pos1, it defaults to 1
 * (the beginning of the field); if omitted from pos2, it defaults to 0
 * (the end of the field).
 */
public class KeyFieldBasedPartitioner extends Partitioner {

  private static final Log LOG = LogFactory.getLog(
      KeyFieldBasedPartitioner.class.getName());
  //public static String PARTITIONER_OPTIONS = 
  //  "mapreduce.partition.keypartitioner.options";
  private int numOfPartitionFields;

  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();

  private JobConf conf;

  @Override
  public void configure(JobConf conf) {
    this.conf = conf;
    keyFieldHelper = new KeyFieldHelper();
    String keyFieldSeparator =
        unescapeSeparator(conf.get("map.output.key.field.separator", "\t"));
    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
    //if (conf.get("num.key.fields.for.partition") != null) {
    //  LOG.warn("Using deprecated num.key.fields.for.partition. " +
    //  		"Use mapreduce.partition.keypartitioner.options instead");
    this.numOfPartitionFields = conf.getInt("num.key.fields.for.partition", 0);
    keyFieldHelper.setKeyFieldSpec(1, numOfPartitionFields);
    //} else {
    //  String option = conf.get(PARTITIONER_OPTIONS);
    //  keyFieldHelper.parseOption(option);
    //}
  }

  public int getPartition(Record keyRec, Record valueRec, int numReduceTasks) {
    Object key = keyRec.get(0);

    byte[] keyBytes;

    List<KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
    if (allKeySpecs.size() == 0) {
      return getPartition(key.toString().hashCode(), numReduceTasks);
    }

    try {
      keyBytes = key.toString().getBytes("UTF-8");
    } catch (UnsupportedEncodingException e) {
      throw new RuntimeException("The current system does not " +
                                 "support UTF-8 encoding!", e);
    }
    // return 0 if the key is empty
    if (keyBytes.length == 0) {
      return 0;
    }

    int[] lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0,
                                                             keyBytes.length);
    int currentHash = 0;
    for (KeyDescription keySpec : allKeySpecs) {
      int startChar = keyFieldHelper.getStartOffset(keyBytes, 0,
                                                    keyBytes.length, lengthIndicesFirst, keySpec);
      // no key found! continue
      if (startChar < 0) {
        continue;
      }
      int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length,
                                                lengthIndicesFirst, keySpec);
      currentHash = hashCode(keyBytes, startChar, endChar,
                             currentHash);
    }
    return getPartition(currentHash, numReduceTasks);
  }

  protected int hashCode(byte[] b, int start, int end, int currentHash) {
    for (int i = start; i <= end; i++) {
      currentHash = 31 * currentHash + b[i];
    }
    return currentHash;
  }

  protected int getPartition(int hash, int numReduceTasks) {
    return (hash & Integer.MAX_VALUE) % numReduceTasks;
  }

  /**
   * Set the {@link KeyFieldBasedPartitioner} options used for 
   * {@link Partitioner}
   *
   * @param keySpec the key specification of the form -k pos1[,pos2], where,
   *  pos is of the form f[.c][opts], where f is the number
   *  of the key field to use, and c is the number of the first character from
   *  the beginning of the field. Fields and character posns are numbered 
   *  starting with 1; a character position of zero in pos2 indicates the
   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
   *  (the end of the field).
   */
  //public void setKeyFieldPartitionerOptions(JobConf job, String keySpec) {
  //  job.set(PARTITIONER_OPTIONS, keySpec);
  //}

  /**
   * Get the {@link KeyFieldBasedPartitioner} options
   */
  //public String getKeyFieldPartitionerOption(JobConf job) {
  //  return job.get(PARTITIONER_OPTIONS);
  //}


}
